Skip to content

feat(asyncio): add Reader API#309

Open
BewareMyPower wants to merge 3 commits into
apache:mainfrom
BewareMyPower:bewaremypower/async-reader
Open

feat(asyncio): add Reader API#309
BewareMyPower wants to merge 3 commits into
apache:mainfrom
BewareMyPower:bewaremypower/async-reader

Conversation

@BewareMyPower

Copy link
Copy Markdown
Contributor

No description provided.

@BewareMyPower BewareMyPower self-assigned this Jun 17, 2026
@BewareMyPower BewareMyPower added this to the 3.13.0 milestone Jun 17, 2026
@BewareMyPower BewareMyPower marked this pull request as draft June 17, 2026 08:02
@BewareMyPower BewareMyPower marked this pull request as ready for review June 18, 2026 08:21
@RobertIndie RobertIndie requested a review from Copilot June 23, 2026 07:58

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot couldn't run its full agentic review because no GitHub Actions runner was available. Make sure your repository has a runner available to run Copilot's review, or add a copilot-setup-steps.yml file specifying one with the runs-on attribute. See the docs for more details.

Adds an asyncio-compatible Reader API to the Python Pulsar client, including C++ binding support and new integration tests.

Changes:

  • Introduced pulsar.asyncio.Reader with async read_next, seek, has_message_available, and close.
  • Added pybind11 C++ exports for Reader async methods and Client async reader creation.
  • Added asyncio test coverage for basic reader flows (start positions, seek, message availability, auth failure).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.

File Description
tests/asyncio_test.py Adds asyncio integration tests validating new Reader behaviors (read, seek, has-message, auth).
src/reader.cc Exposes Reader async operations (read_next_async, seek_async, etc.) via pybind11 with GIL release.
src/client.cc Exposes async reader creation APIs (create_reader_async, create_reader_async_v2) for asyncio wrapper use.
pulsar/asyncio.py Implements the new Reader wrapper and Client.create_reader() coroutine.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pulsar/asyncio.py
self._reader = reader
self._schema = schema

async def read_next(self, timeout_millis: int | None = None) -> pulsar.Message:
Comment thread pulsar/asyncio.py
Comment on lines +492 to +498
future = asyncio.get_running_loop().create_future()
if timeout_millis is None:
self._reader.read_next_async(functools.partial(_set_future, future))
else:
_check_type(int, timeout_millis, 'timeout_millis')
self._reader.read_next_async(functools.partial(_set_future, future))
msg = await future
Comment thread src/reader.cc
Comment on lines +58 to +61
void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) {
py::gil_scoped_release release;
reader.readNextAsync(callback);
}
Comment thread src/reader.cc
Comment on lines 87 to +90
.def("topic", &Reader::getTopic, return_value_policy::copy)
.def("read_next", &Reader_readNext)
.def("read_next", &Reader_readNextTimeout)
.def("read_next_async", &Reader_readNextAsync)
Comment thread tests/asyncio_test.py
msg = await consumer.receive()
self.assertEqual(msg.data(), b'msg-3')

async def test_reader_simple(self):
Comment thread tests/asyncio_test.py
Comment on lines +479 to +480
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(reader.read_next(), 1)
Comment thread pulsar/asyncio.py
PulsarException
"""
future = asyncio.get_running_loop().create_future()
if timeout_millis is None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the timeout_millis parameter is useless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants